{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pinecone-io/examples/blob/master/learn/generation/langchain/handbook/09-langchain-streaming/09-langchain-streaming.ipynb) [![Open nbviewer](https://raw.githubusercontent.com/pinecone-io/examples/master/assets/nbviewer-shield.svg)](https://nbviewer.org/github/pinecone-io/examples/blob/master/learn/generation/langchain/handbook/09-langchain-streaming/09-langchain-streaming.ipynb)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### [LangChain Handbook](https://pinecone.io/learn/langchain)\n",
    "\n",
    "# Streaming\n",
    "\n",
    "For LLMs, streaming has become an increasingly popular feature. The idea is to rapidly return tokens as an LLM is generating them, rather than waiting for a full response to be created before returning anything.\n",
    "\n",
    "Streaming is actually very easy to implement for simple use-cases, but it can get complicated when we start including things like Agents which have their own logic running which can block our attempts at streaming. Fortunately, we can make it work — it just requires a little extra effort.\n",
    "\n",
    "We'll start easy by implementing streaming to the terminal for LLMs, but by the end of the notebook we'll be handling the more complex task of streaming via FastAPI for Agents.\n",
    "\n",
    "First, let's install all of the libraries we'll be using."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install -qU \\\n",
    "    openai==0.28.0 \\\n",
    "    langchain==0.0.301 \\\n",
    "    fastapi==0.103.1 \\\n",
    "    \"uvicorn[standard]\"==0.23.2"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## LLM Streaming to Stdout"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The simplest form of streaming is to simply \"print\" the tokens as they're generated. To set this up we need to initialize an LLM (one that supports streaming, not all do) with two specific parameters:\n",
    "\n",
    "* `streaming=True`, to enable streaming\n",
    "* `callbacks=[SomeCallBackHere()]`, where we pass a LangChain callback class (or list containing multiple).\n",
    "\n",
    "The `streaming` parameter is self-explanatory. The `callbacks` parameter and callback classes less so — essentially they act as little bits of code that do something as each token from our LLM is generated. As mentioned, the simplest form of streaming is to print the tokens as they're being generated, like with the `StreamingStdOutCallbackHandler`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from langchain.chat_models import ChatOpenAI\n",
    "from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\") or \"YOUR_API_KEY\"\n",
    "\n",
    "llm = ChatOpenAI(\n",
    "    openai_api_key=os.getenv(\"OPENAI_API_KEY\"),\n",
    "    temperature=0.0,\n",
    "    model_name=\"gpt-3.5-turbo\",\n",
    "    streaming=True,  # ! important\n",
    "    callbacks=[StreamingStdOutCallbackHandler()]  # ! important\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now if we run the LLM we'll see the response being _streamed_."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.schema import HumanMessage\n",
    "\n",
    "# create messages to be passed to chat LLM\n",
    "messages = [HumanMessage(content=\"tell me a long story\")]\n",
    "\n",
    "llm(messages)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That was surprisingly easy, but things begin to get much more complicated as soon as we begin using agents. Let's first initialize an agent."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.memory import ConversationBufferWindowMemory\n",
    "from langchain.agents import load_tools, AgentType, initialize_agent\n",
    "\n",
    "# initialize conversational memory\n",
    "memory = ConversationBufferWindowMemory(\n",
    "    memory_key=\"chat_history\",\n",
    "    k=5,\n",
    "    return_messages=True,\n",
    "    output_key=\"output\"\n",
    ")\n",
    "\n",
    "# create a single tool to see how it impacts streaming\n",
    "tools = load_tools([\"llm-math\"], llm=llm)\n",
    "\n",
    "# initialize the agent\n",
    "agent = initialize_agent(\n",
    "    agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,\n",
    "    tools=tools,\n",
    "    llm=llm,\n",
    "    memory=memory,\n",
    "    verbose=True,\n",
    "    max_iterations=3,\n",
    "    early_stopping_method=\"generate\",\n",
    "    return_intermediate_steps=False\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We already added our `StreamingStdOutCallbackHandler` to the agent as we initialized the agent with the same `llm` as we created with that callback. So let's see what we get when running the agent."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "prompt = \"Hello, how are you?\"\n",
    "\n",
    "agent(prompt)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Not bad, but we do now have the issue of streaming the _entire_ output from the LLM. Because we're using an agent, the LLM is instructed to output the JSON format we can see here so that the agent logic can handle tool usage, multiple \"thinking\" steps, and so on. For example, if we ask a math question we'll see this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agent(\"what is the square root of 71?\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It's interesting to see during development but we'll want to clean this streaming up a little in any actual use-case. For that we can go with two approaches — either we build a custom callback handler, or use a purpose built callback handler from LangChain (as usual, LangChain has something for everything). Let's first try LangChain's purpose-built `FinalStreamingStdOutCallbackHandler`.\n",
    "\n",
    "We will overwrite the existing `callbacks` attribute found here:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agent.agent.llm_chain.llm"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With the new callback handler:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain.callbacks.streaming_stdout_final_only import (\n",
    "    FinalStreamingStdOutCallbackHandler,\n",
    ")\n",
    "\n",
    "agent.agent.llm_chain.llm.callbacks = [\n",
    "    FinalStreamingStdOutCallbackHandler(\n",
    "        answer_prefix_tokens=[\"Final\", \"Answer\"]\n",
    "    )\n",
    "]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's try it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agent(\"what is the square root of 71?\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Not quite there, we should really clean up the `answer_prefix_tokens` argument but it is hard to get right. It's generally easier to use a custom callback handler like so:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "\n",
    "class CallbackHandler(StreamingStdOutCallbackHandler):\n",
    "    def __init__(self):\n",
    "        self.content: str = \"\"\n",
    "        self.final_answer: bool = False\n",
    "\n",
    "    def on_llm_new_token(self, token: str, **kwargs: any) -> None:\n",
    "        self.content += token\n",
    "        if \"Final Answer\" in self.content:\n",
    "            # now we're in the final answer section, but don't print yet\n",
    "            self.final_answer = True\n",
    "            self.content = \"\"\n",
    "        if self.final_answer:\n",
    "            if '\"action_input\": \"' in self.content:\n",
    "                if token not in [\"}\"]:\n",
    "                    sys.stdout.write(token)  # equal to `print(token, end=\"\")`\n",
    "                    sys.stdout.flush()\n",
    "\n",
    "agent.agent.llm_chain.llm.callbacks = [CallbackHandler()]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's try again:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agent(\"what is the square root of 71?\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "agent.agent.llm_chain.llm"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It isn't perfect, but this is getting better. Now, in most scenarios we're unlikely to simply be printing output to a terminal or notebook. When we want to do something more complex like stream this data through another API, we need to do things differently."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using FastAPI with Agents"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In most cases we'll be placing our LLMs, Agents, etc behind something like an API. Let's add that into the mix and see how we can implement streaming for agents with FastAPI.\n",
    "\n",
    "First, we'll create a simple `main.py` script to contain our FastAPI logic. You can find it in the same GitHub repo location as this notebook ([here's a link](https://github.com/pinecone-io/examples/blob/langchain-streaming/learn/generation/langchain/handbook/09-langchain-streaming/main.py)).\n",
    "\n",
    "To run the API, navigate to the directory and run `uvicorn main:app --reload`. Once complete, you can confirm it is running by looking for the 🤙 status in the next cell output:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'status': '🤙'}"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import requests\n",
    "\n",
    "res = requests.get(\"http://localhost:8000/health\")\n",
    "res.json()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<Response [200]>"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "res = requests.get(\"http://localhost:8000/chat\",\n",
    "    json={\"text\": \"hello there!\"}\n",
    ")\n",
    "res"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'input': 'hello there!',\n",
       " 'chat_history': [],\n",
       " 'output': 'Hello! How can I assist you today?'}"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "res.json()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Unlike with our StdOut streaming, we now need to send our tokens to a generator function that feeds those tokens to FastAPI via a `StreamingResponse` object. To handle this we need to use async code, otherwise our generator will not begin emitting anything until _after_ generation is already complete.\n",
    "\n",
    "The `Queue` is accessed by our callback handler, as as each token is generated, it puts the token into the queue. Our generator function asyncronously checks for new tokens being added to the queue. As soon as the generator sees a token has been added, it gets the token and yields it to our `StreamingResponse`.\n",
    "\n",
    "To see it in action, we'll define a stream requests function called `get_stream`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_stream(query: str):\n",
    "    s = requests.Session()\n",
    "    with s.get(\n",
    "        \"http://localhost:8000/chat\",\n",
    "        stream=True,\n",
    "        json={\"text\": query}\n",
    "    ) as r:\n",
    "        for line in r.iter_content():\n",
    "            print(line.decode(\"utf-8\"), end=\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " \"Hello! How can I assist you today?\"\n"
     ]
    }
   ],
   "source": [
    "get_stream(\"hi there!\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "ml",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
